Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-2855] Check Cron Expression Validity in DagBag.process_file() #3698

Merged
merged 1 commit into from
Aug 8, 2018

Conversation

XD-DENG
Copy link
Member

@XD-DENG XD-DENG commented Aug 5, 2018

Jira

  • My PR addresses the following Airflow Jira issues and references them in the PR title. For example, "[AIRFLOW-XXX] My Airflow PR"

Description

  • Here are some details about my PR, including screenshots of any UI changes:

schedule_interval of DAGs can either be timedelta or a Cron expression.

When it's a Cron expression, there is no mechanism to check its validity at this moment. If there is anything wrong with the Cron expression itself, it will cause issues when methods following_schedule() and previous_schedule() are invoked (will affect scheduling).

However, exceptions will only be written into logs. From Web UI, it’s hard for users to identify this issue & the source while no new task can be initiated (especially for users who’re not very familiar with Cron).

It may be good to show error messages in web UI when a DAG's Cron expression (as schedule_interval) can not be parsed by croniter properly (this is implemented by adding these exceptions into metadata import_error, whose entries will be shown as error messages in web UI).

Related tests are added as well.

Screenshot
screen shot 2018-08-05 at 10 23 46 pm

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added.

Code Quality

  • Passes git diff upstream/master -u -- "*.py" | flake8 --diff

@XD-DENG XD-DENG force-pushed the patch-6 branch 5 times, most recently from 0fa05cf to 68f5f3d Compare August 6, 2018 02:17
@codecov-io
Copy link

codecov-io commented Aug 6, 2018

Codecov Report

Merging #3698 into master will increase coverage by <.01%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #3698      +/-   ##
==========================================
+ Coverage   77.57%   77.57%   +<.01%     
==========================================
  Files         204      204              
  Lines       15770    15776       +6     
==========================================
+ Hits        12233    12239       +6     
  Misses       3537     3537
Impacted Files Coverage Δ
airflow/models.py 88.61% <100%> (+0.02%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 120f485...1a5936f. Read the comment docs.

self.import_errors[dag.full_filepath] = \
"Invalid Cron expression: " + str(cron_e)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something but do we need to perform the check twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One is for .py DAG file, and the another is for package DAG file (.zip file).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yrqls21 , you're right. Thanks for pointing this out.

I'll modify accordingly.

@@ -1038,6 +1038,21 @@ def test_zip(self):
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_zip.zip"))
self.assertTrue(dagbag.get_dag("test_zip_dag"))

def test_process_file_cron_validity_check(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we do still want to keep the file in 644 instead of 755 mode.

tests/models.py Outdated
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d))

for d in invalid_dag_files:
self.assertTrue(any([d in k and "Invalid Cron expression" in v
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I think we don't need any

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The statement inside any() is returning a List of Boolean values.
Any() helps check if there is at least one True element.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we expecting more than one import error from importing the dedicated test DAG file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dagbag.import_errors is a Dict whose Key are file path and Value are exception details.

Here what I did is to check "File is the one I'm intending to check" AND "exception is the exception that I'm looking into". The Length of this Boolean value List is the same as the number of entries in metadata import_error, in which there may be other errors other than those I'm checking here (here I only try to identify Cron invalidity errors.).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sry I missed the part that you are parsing the default DAG_HOME-indeed there might be other import errors. Would you double check what would be processed by calling models.DagBag() please? If "test_invalid_cron.py", "test_zip_invalid_cron.zip"are included I guess we don't need to reprocess them and if not I guess we don't need to parse the default DAG_HOME at all.

Back to the original point, what originally made me feel a bit strange is that we can actually check the errors in O(n) but now we do that in O(n^m), m=# of import errors. But it practically makes no difference so that's why it is a NIT, it's your freedom to give or take.

Copy link
Member Author

@XD-DENG XD-DENG Aug 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yrqls21 , great points!

  • For your 1st point, actually you're right. It's not really necessary to re-parse these two files, i.e. the code below can be moved.
for d in invalid_dag_files:
    dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d))

But I purposely keep them here to make it clear to readers what I'm testing here.

  • Your 2nd point is great. I agree it makes no practical difference here, given n and m would never be too big here. But I think I will change this part as well. Always good to keep performance in mind.

Thanks for your reviewing!

@@ -414,6 +416,16 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
found_dags.append(dag)
found_dags += dag.subdags
if isinstance(dag._schedule_interval, six.string_types):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe put the check before we append the dag to found_dags?( if that's what we do for other dags with import errors)

@XD-DENG
Copy link
Member Author

XD-DENG commented Aug 6, 2018

Thanks @yrqls21 again for the review inputs. Very helpful for refining this PR!

Have updated accordingly and ensured all tests passed.

Copy link
Member

@KevinYang21 KevinYang21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM besides the NITs, summoning committers @Fokko @ashb @kaxil @aoen

tests/models.py Outdated
as schedule interval can be identified
"""
invalid_dag_files = ["test_invalid_cron.py", "test_zip_invalid_cron.zip"]
dagbag = models.DagBag()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I understand that you want your test to show clearly that you are processing those two test files so it make sense to keep the dagbag.process_file line. Then I guess we can here process something like an empty/made up directory--the test DAG files in the default DAG_HOME may grow and then we have 2x parsing time growth for the additional file, and our CI is already painfully lengthy ;)

tests/models.py Outdated
for k, v in dagbag.import_errors.items()
if "Invalid Cron expression" in v]
for d in invalid_dag_files:
self.assertTrue(d in files_with_cron_error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sry for being picky here, just NIT, it is still O(n^2) here since files_with_cron_error is a list :P

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yrqls21 come on, n is 2 here LOL

But I do agree with your another point on the DagBag's dag_folder. What I have done in my latest commit is to pass a temporary directory to dag_folder of DagBag, so that it will not go scan all the test DAGs.

In addition, changing dag_folder of DagBag to a temp directory also makes the testing much easier. We simply need to check whether the number of errors grows from 0 to # of invalid Cron tests. No more big O LOL!

I do love your reviews! All valid and helpful points. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL I was just messing around for the holy perfectionism spirit. We can even be more assertive if we know those two are import error from cron exception. Good job for making the change to improve Airflow! Thank you!

(it's still O(n) tho ;))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:-| Crazy now LOL

Thanks again! And look forward to your reviewing in any of my future commits!

@XD-DENG
Copy link
Member Author

XD-DENG commented Aug 7, 2018

Refined how we test this new feature, as suggested by @yrqls21 .

@KevinYang21
Copy link
Member

KevinYang21 commented Aug 7, 2018 via email

Copy link
Member

@feng-tao feng-tao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm

tests/models.py Outdated
@@ -56,7 +56,7 @@
from airflow.utils.trigger_rule import TriggerRule
from mock import patch, ANY
from parameterized import parameterized
from tempfile import NamedTemporaryFile
from tempfile import NamedTemporaryFile, mkdtemp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit: could you move mkdtemp before NamedTemporaryFile given we put lower case import before upper case(L57)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as suggested. Thanks.

A DAG can be imported as a .py script properly,
but the Cron expression inside as "schedule_interval" may be
invalid, like "0 100 * * *".

This commit helps check the validity of Cron expression in DAG
files (.py) and packaged DAG files (.zip), and help show
exception messages in web UI by add these exceptions into
metadata "import_error".
@feng-tao feng-tao merged commit d47580f into apache:master Aug 8, 2018
@feng-tao
Copy link
Member

feng-tao commented Aug 8, 2018

thanks @XD-DENG @yrqls21

@KevinYang21
Copy link
Member

TY @feng-tao!

@XD-DENG XD-DENG deleted the patch-6 branch August 8, 2018 02:21
lxneng pushed a commit to lxneng/incubator-airflow that referenced this pull request Aug 10, 2018
apache#3698)

A DAG can be imported as a .py script properly,
but the Cron expression inside as "schedule_interval" may be
invalid, like "0 100 * * *".

This commit helps check the validity of Cron expression in DAG
files (.py) and packaged DAG files (.zip), and help show
exception messages in web UI by add these exceptions into
metadata "import_error".
ashb pushed a commit to ashb/airflow that referenced this pull request Oct 4, 2018
apache#3698)

A DAG can be imported as a .py script properly,
but the Cron expression inside as "schedule_interval" may be
invalid, like "0 100 * * *".

This commit helps check the validity of Cron expression in DAG
files (.py) and packaged DAG files (.zip), and help show
exception messages in web UI by add these exceptions into
metadata "import_error".
ashb pushed a commit to ashb/airflow that referenced this pull request Oct 22, 2018
apache#3698)

A DAG can be imported as a .py script properly,
but the Cron expression inside as "schedule_interval" may be
invalid, like "0 100 * * *".

This commit helps check the validity of Cron expression in DAG
files (.py) and packaged DAG files (.zip), and help show
exception messages in web UI by add these exceptions into
metadata "import_error".
galak75 pushed a commit to VilledeMontreal/incubator-airflow that referenced this pull request Nov 23, 2018
apache#3698)

A DAG can be imported as a .py script properly,
but the Cron expression inside as "schedule_interval" may be
invalid, like "0 100 * * *".

This commit helps check the validity of Cron expression in DAG
files (.py) and packaged DAG files (.zip), and help show
exception messages in web UI by add these exceptions into
metadata "import_error".
aliceabe pushed a commit to aliceabe/incubator-airflow that referenced this pull request Jan 3, 2019
apache#3698)

A DAG can be imported as a .py script properly,
but the Cron expression inside as "schedule_interval" may be
invalid, like "0 100 * * *".

This commit helps check the validity of Cron expression in DAG
files (.py) and packaged DAG files (.zip), and help show
exception messages in web UI by add these exceptions into
metadata "import_error".
cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Jan 23, 2019
A DAG can be imported as a .py script properly,
but the Cron expression inside as "schedule_interval" may be
invalid, like "0 100 * * *".

This commit helps check the validity of Cron expression in DAG
files (.py) and packaged DAG files (.zip), and help show
exception messages in web UI by add these exceptions into
metadata "import_error".
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants